package com.tbit.uqbike.tergateway.entity;

import com.tbit.uqbike.TerGatewayMain;
import com.tbit.uqbike.tergateway.config.GateXmlConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import java.util.concurrent.locks.ReentrantLock;

public class MqttConnInfo extends AConnInfo {
    private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttConnInfo.class);

    // 此处经常出现问题，发现是等待应答超时导致，因为没有并发量临时加一个全局同步，尝试解决
    private static ReentrantLock lock = new ReentrantLock();

    @Override
    public ChannelHandlerContext getCtx() {
        return null;
    }

    @Override
    public boolean downMsg(ByteBuf byteBuf) {
        try {
            lock.lock();

            String topic = String.format("%s-%s_%s-get", GateXmlConfig.platform, GateXmlConfig.productKey, mno);
            //String topic = String.format("R57ADZFf2sZ-%s-get", mno);

            byte[] bs = new byte[byteBuf.writerIndex()];
            byteBuf.readBytes(bs);
            Message<byte[]> message = MessageBuilder.withPayload(bs).setHeader(MqttHeaders.TOPIC, topic).build();
            TerGatewayMain.mqttSend.handleMessage(message);
            return true;
        } catch (Exception e) {
            logger.error("downMsg", e);
        } finally {
            lock.unlock();
            if (byteBuf != null) {
                byteBuf.release();
            }
        }
        return false;
    }

    @Override
    public void closeConn() {
    }
}
